package com.atguigu.flink.chapter07.timer;

import com.atguigu.flink.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class Flink01_Timer_Project {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        SingleOutputStreamOperator<String> main = env
                .socketTextStream("hadoop162", 9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(data[0], Long.valueOf(data[1]) * 1000, Integer.valueOf(data[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((ws, ts) -> ws.getTs())
                )
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    private Long timerTs;
                    boolean isFirst = true;
                    int lastVc;

                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        if (isFirst) {
                            // 第一条数据进来， 注册定时器
                            timerTs = ctx.timestamp() + 5000;
                            ctx.timerService().registerEventTimeTimer(timerTs);
                            isFirst = false;
                        } else {
                            // 判断水位相比上个水位是否上升
                            Integer currentVc = value.getVc();
                            if (currentVc <= lastVc) { // 当水位没有上升， 取消定时器
                                ctx.timerService().deleteEventTimeTimer(timerTs);
                                // 重新注册定时器
                                timerTs = ctx.timestamp() + 5000;
                                ctx.timerService().registerEventTimeTimer(timerTs);
                            }
                        }
                        lastVc = value.getVc();
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect(ctx.getCurrentKey() + " 5s内水位连续上升， 预警...");
                        isFirst = true;
                    }
                });

        main.print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
